package mp;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class WordCountAppOther {
	
	/**
	 * KEYIN	即k1		表示行的偏移量(每一行的起始位置)	Long-->LongWritable
	 * VALUEIN	即v1		表示每一行文本内容		String-->Text
	 * KEYOUT	即k2		表示行中出现的单词(每一行中的每个单词)
	 * VALUEOUT	即v2		表示行中出现的单词的次数，固定值1(每一行中的每个单词的出现次数,固定值1)
	 */
	static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
		
		 /**
	      *   通过Mapper的源码可知，map()方法张的k1和v1，分别来自于KEYIN和VALUEIN,即k1和v1
	     */
		@Override
		protected void map(LongWritable k1, Text v1, Context context)
				throws IOException, InterruptedException {
			String[] split = v1.toString().split("\t");
			for(String word : split){
				System.out.println("k1:" + k1 + "--------" + word);
				context.write(new Text(word), new LongWritable(1L));
			}
			
		};
		
	}
	
	/**
	 * KEYIN	即k2		表示行中出现的单词
	 * VALUEIN	即v2		表示行中出现的单词的次数
	 * KEYOUT	即k3		表示文本中出现的不同单词(整个文件中的不同单词)
	 * VALUEOUT	即v3		表示文本中出现的不同单词的总次数(整个文件中的不同单词出现总次数)
	 *
	 */
	static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable>{

		@Override
		protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s,
				Context context) throws IOException, InterruptedException {
			long times = 0L;
			for(LongWritable count : v2s){
				times += count.get();
			}
			System.out.println("k2:" + k2 + "--------" + times);
			context.write(k2, new LongWritable(times));
		};

	}
	
	static final String INPUT_PATH = "hdfs://54.223.235.209:9000/hello";
	static final String OUT_PATH = "hdfs://54.223.235.209:9000/out";
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		final Path outPath = new Path(OUT_PATH);
		if(fileSystem.exists(outPath)){
			fileSystem.delete(outPath, true);
		}
		
		
		
		Job job = new Job(conf, WordCountAppOther.class.getSimpleName());
		
		//1.1 指定读取的文件位于哪里
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		//指定如何对输入文件进行格式化，把输入文件每一行解析成键值对
		//job.setInputFormatClass(TextInputFormat.class);
		
		//1.2指定自定义的map类
		job.setMapperClass(MyMapper.class);
		//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致，则可以省略
		//job.setMapOutputKeyClass(Text.class);//Hadoop中的Text，代表Java中的String
		//job.setMapOutputValueClass(LongWritable.class);//Hadoop中的LongWritable，代表Java中的Long
		
		//1.3分区(shuffle)
		//job.setPartitionerClass(HashPartitioner.class);
		//有一个reduce任务运行
		//job.setNumReduceTasks(1);
		
		//1.4排序，分组
		
		//1.5归约
		
		//2.2指定自定义reduce类
		job.setReducerClass(MyReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		
		//2.3指定写出到哪里
		FileOutputFormat.setOutputPath(job, outPath);
		//指定输出文件的格式化类
		//job.setOutputFormatClass(TextOutputFormat.class);
		
		//把job提交给JobTracker运行
		job.waitForCompletion(true);
	}

}
